{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "import configparser\n",
    "from datetime import datetime\n",
    "import calendar\n",
    "import os\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql.functions import udf, col\n",
    "from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format\n",
    "from pyspark.sql.functions import monotonically_increasing_id\n",
    "\n",
    "config = configparser.ConfigParser()\n",
    "config.read('dl.cfg')\n",
    "\n",
    "os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']\n",
    "os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_spark_session():\n",
    "    spark = SparkSession \\\n",
    "        .builder \\\n",
    "        .config(\"spark.jars.packages\", \"org.apache.hadoop:hadoop-aws:2.7.0\") \\\n",
    "        .getOrCreate()\n",
    "    return spark\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "def process_song_data(spark, input_data, output_data):\n",
    "    # get filepath to song data file\n",
    "    song_data = os.path.join(input_data,\"song_data/*/*/*/*.json\")\n",
    "    # read song data file\n",
    "    df = spark.read.json(song_data)\n",
    "    # extract columns to create songs table\n",
    "    songs_table = df['song_id', 'title', 'artist_id', 'year', 'duration']\n",
    "    \n",
    "    # write songs table to parquet files partitioned by year and artist\n",
    "    songs_table.write.partitionBy('year', 'artist_id').parquet(os.path.join(output_data, 'songs.parquet'), 'overwrite')\n",
    "    print(\"--- songs.parquet completed ---\")\n",
    "    # extract columns to create artists table\n",
    "    artists_table = df['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']\n",
    "    \n",
    "    # write artists table to parquet files\n",
    "    artists_table.write.parquet(os.path.join(output_data, 'artists.parquet'), 'overwrite')\n",
    "    print(\"--- artists.parquet completed ---\")\n",
    "    print(\"*** process_song_data completed ***\\n\\n\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def process_log_data(spark, input_data, output_data):\n",
    "    # get filepath to log data file\n",
    "    log_data =os.path.join(input_data,\"log_data/*/*/*.json\")\n",
    "\n",
    "    # read log data file\n",
    "    df = spark.read.json(log_data)\n",
    "    \n",
    "    # filter by actions for song plays\n",
    "#     songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent\n",
    "    songplays_table = df['ts', 'userId', 'level','sessionId', 'location', 'userAgent']\n",
    "\n",
    "    # extract columns for users table \n",
    "#     user_id, first_name, last_name, gender, level\n",
    "    users_table = df['userId', 'firstName', 'lastName', 'gender', 'level']\n",
    "    \n",
    "    # write users table to parquet files\n",
    "    users_table.write.parquet(os.path.join(output_data, 'users.parquet'), 'overwrite')\n",
    "    print(\"--- users.parquet completed ---\")\n",
    "    # create timestamp column from original timestamp column\n",
    "    get_timestamp = udf(lambda x: str(int(int(x)/1000)))\n",
    "    df = df.withColumn('timestamp', get_timestamp(df.ts))\n",
    "    # create datetime column from original timestamp column\n",
    "    get_datetime = udf(lambda x: datetime.fromtimestamp(int(int(x)/1000)))\n",
    "    get_week = udf(lambda x: calendar.day_name[x.weekday()])\n",
    "    get_weekday = udf(lambda x: x.isocalendar()[1])\n",
    "    get_hour = udf(lambda x: x.hour)\n",
    "    get_day = udf(lambda x : x.day)\n",
    "    get_year = udf(lambda x: x.year)\n",
    "    get_month = udf(lambda x: x.month)\n",
    "    \n",
    "   \n",
    "    df = df.withColumn('start_time', get_datetime(df.ts))\n",
    "    df = df.withColumn('hour', get_hour(df.start_time))\n",
    "    df = df.withColumn('day', get_day(df.start_time))\n",
    "    df = df.withColumn('week', get_week(df.start_time))\n",
    "    df = df.withColumn('month', get_month(df.start_time))\n",
    "    df = df.withColumn('year', get_year(df.start_time))\n",
    "    df = df.withColumn('weekday', get_weekday(df.start_time))\n",
    "    time_table  = df['start_time', 'hour', 'day', 'week', 'month', 'year', 'weekday']\n",
    "    \n",
    "    # write time table to parquet files partitioned by year and month\n",
    "    time_table.write.partitionBy('year', 'month').parquet(os.path.join(output_data, 'time.parquet'), 'overwrite')\n",
    "    print(\"--- time.parquet completed ---\")\n",
    "    # read in song data to use for songplays table\n",
    "    song_df = spark.read.parquet(\"songs.parquet\")\n",
    "\n",
    "    # extract columns from joined song and log datasets to create songplays table \n",
    "    df = df.join(song_df, song_df.title == df.song)\n",
    "    songplays_table = df['start_time', 'userId', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent']\n",
    "    songplays_table.select(monotonically_increasing_id().alias('songplay_id')).collect()\n",
    "    # write songplays table to parquet files partitioned by year and month\n",
    "    songplays_table.write.parquet(os.path.join(output_data, 'songplays.parquet'), 'overwrite')\n",
    "    print(\"--- songplays.parquet completed ---\")\n",
    "    print(\"*** process_log_data completed ***\\n\\nEND\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "spark = create_spark_session()\n",
    "input_data = \"s3a://udacity-dend/\"\n",
    "output_data = \"\"\n",
    "\n",
    "# process_song_data(spark, input_data, output_data) \n",
    "# process_log_data(spark, input_data, output_data)\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "song_df = spark.read.parquet(\"songs.parquet\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "song_df.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# log_data =os.path.join(input_data,\"log_data/2018/11/2018-11-12-events.json\")\n",
    "\n",
    "# # read log data file\n",
    "# df = spark.read.json(log_data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# df.show(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# df = df.join(song_df, song_df.title == df.song)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# df.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# os.system(\"rm -rf users\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
